package com.wx.learn.kafka.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * @author jxlgzzw
 * @date 2020-06-06 15:37
 * @description 生产者
 */
public class ProducerTest {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        prop.setProperty("bootstrap.servers", "ud2:9092,ud3:9092,ud4:9092");

        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        int count=0;
        while (count++<100){

            //生产者消息
            ProducerRecord<String, String> record = new ProducerRecord<>("test1", count+"这是生产者生产的消息。。。");
            System.out.println("----> "+record);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e!=null)e.printStackTrace();
                    else{
                        int partition = recordMetadata.partition();
                        long offset = recordMetadata.offset();
                        long timestamp = recordMetadata.timestamp();
                        System.out.println("---->"+partition+"---"+offset+"---"+timestamp);
                    }
                }
            });
        }
        producer.flush();
        producer.close();
    }



}
